#!/usr/bin/python3

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import base64
import subprocess
import time

import parent
from testlib import *


class TestLogin(MachineCase):

    def testBasic(self):
        m = self.machine
        b = self.browser

        # Setup users and passwords
        m.execute("useradd user -c 'Barney Bär' || true")
        m.execute("echo user:abcdefg | chpasswd")

        admins_only_pam = """account    sufficient   pam_succeed_if.so uid = 0\\
account    required     pam_succeed_if.so user ingroup %s""" % m.get_admin_group()

        # Setup a special PAM config that disallows non-wheel users
        def deny_non_root(remote_filename):
            m.execute("""sed -i '/nologin/a %s' %s || true""" % (admins_only_pam, remote_filename))

        deny_non_root("/etc/pam.d/cockpit")
        deny_non_root("/etc/pam.d/sshd")

        m.start_cockpit()
        b.open("/system")

        def login(user, password):
            b.wait_visible("#login")
            b.wait_visible("#login-user-input")
            b.wait_not_visible("#unsupported-browser")
            b.wait_not_present("#login-button:disabled")
            # :focus selector doesn't work in phantomjs
            b.wait_js_cond("document.activeElement.getAttribute('id') === 'login-user-input'")
            b.set_val('#login-user-input', user)
            b.set_val('#login-password-input', password)
            b.set_checked('#authorized-input', True)
            b.click('#login-button')

        # Test banner
        # Test that we don't show banner when not specified
        b.wait_visible("#login-user-input")
        b.wait_not_visible("#banner")

        m.execute("printf '[Session]\nBanner = /etc/issue\n' > /etc/cockpit/cockpit.conf")
        m.restart_cockpit()
        b.reload()
        b.wait_visible("#login-user-input")
        b.wait_visible("#banner")
        self.assertEqual(b.text("#banner pre"), m.execute("cat /etc/issue").rstrip())

        # Test non existent file
        m.execute("printf '[Session]\nBanner = /etc/non-existing-file\n' > /etc/cockpit/cockpit.conf")
        self.allow_journal_messages("error loading contents of banner: Failed to open file “/etc/non-existing-file”: No such file or directory")
        m.restart_cockpit()
        b.reload()
        b.wait_visible("#login-user-input")
        b.wait_not_visible("#banner")

        # Try to login as a non-existing user
        login("nonexisting", "blahblah")
        b.wait_text_not("#login-error-message", "")
        self.assertNotIn("web", m.execute("who"))

        # Try to login as user with a wrong password
        login("user", "gfedcba")
        b.wait_text_not("#login-error-message", "")
        self.assertNotIn("web", m.execute("who"))

        # Try to login as user with correct password
        login("user", "abcdefg")
        if m.ostree_image:
            b.wait_in_text("#login-error-message", "Server closed connection")
        else:
            b.wait_text("#login-error-message", "Permission denied")
        self.assertNotIn("web", m.execute("who"))

        # Try to login with disabled shell; this does not work on OSTree where
        # we log in through ssh
        if not m.ostree_image:
            m.execute("usermod --shell /bin/false admin; sync")
            b.reload()
            login("admin", "foobar")
            b.wait_text_not("#login-error-message", "")
            m.execute("usermod --shell /bin/bash admin; sync")

        # Login as admin
        b.open("/system")
        login("admin", "foobar")
        b.expect_load()
        b.wait_present("#content")
        b.wait_text('#content-user-name', 'Administrator')

        if m.image not in ['fedora-coreos']: # logs in via ssh, not cockpit-session
            self.assertRegex(m.execute("who"), "^admin *web")

        # reload, which should log us in with the cookie
        b.reload()
        b.wait_present("#content")
        b.wait_text('#content-user-name', 'Administrator')

        if m.image not in ['fedora-coreos']: # logs in via ssh, not cockpit-session
            self.assertRegex(m.execute("who"), "^admin *web")

        b.click("#content-user-name")
        b.click('#go-account')
        b.enter_page("/users")
        b.wait_text("#account-user-name", "admin")
        try:
            m.execute("journalctl -p 7 SYSLOG_IDENTIFIER=cockpit-ws | grep 'cockpit-session: opening pam session'")
            assert False, "cockpit-session debug messsages found"
        except subprocess.CalledProcessError:
            pass

        # Change login screen options
        b.logout()
        b.wait(lambda: "web console" not in m.execute("who"))
        b.wait_visible("#option-group")
        m.execute("printf '[WebService]\nLoginTo = false\n' > /etc/cockpit/cockpit.conf")
        m.restart_cockpit()
        b.open("/system")
        b.wait_not_visible("#option-group")

        # Default options be to display these options
        m.execute("rm /etc/cockpit/cockpit.conf")
        m.restart_cockpit()
        b.open("/system")
        b.wait_visible("#option-group")

        # And now we remove cockpit-ssh which affects the default
        if not m.ostree_image:
            m.execute("rm -f /usr/libexec/cockpit-ssh /usr/lib/cockpit/cockpit-ssh")
            m.restart_cockpit()
            b.open("/system")
            b.wait_not_visible("#option-group")

            # login with user shell that prints some stdout/err noise
            # having stdout output in ~/.bashrc confuses docker, so don't run on Atomic
            m.execute("echo 'echo noise-rc-out; echo noise-rc-err >&2' >> ~admin/.bashrc")
            m.execute("echo 'echo noise-profile-out; echo noise-profile-err >&2' >> ~admin/.profile")
            login("admin", "foobar")
            b.expect_load()
            b.wait_present("#content")

        self.allow_journal_messages("Returning error-response ... with reason .*",
                                    "pam_unix\(cockpit:auth\): authentication failure; .*",
                                    "pam_unix\(cockpit:auth\): check pass; user unknown",
                                    "pam_succeed_if\(cockpit:auth\): requirement .* not met by user .*")

        self.allow_restart_journal_messages()

    def testExpired(self):
        m = self.machine
        b = self.browser

        # On OSTree this happens over ssh
        if m.ostree_image:
            m.execute(
                "sed -i 's/.*ChallengeResponseAuthentication.*/ChallengeResponseAuthentication yes/' /etc/ssh/sshd_config", direct=True)
            m.execute(
                "( ! systemctl is-active sshd.socket || systemctl stop sshd.socket) && systemctl restart sshd.service", direct=True)

        m.execute("chage -d 0 admin")
        m.start_cockpit()
        b.open("/system")

        b.wait_visible("#login")
        b.wait_not_visible("#conversation-group")
        b.wait_visible("#password-group")
        b.wait_visible("#user-group")
        b.set_val('#login-user-input', "admin")
        b.set_val('#login-password-input', "foobar")
        b.click('#login-button')

        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        if m.ostree_image:
            b.wait_in_text("#conversation-prompt", "You are required to change your password")
        else:
            b.wait_in_text("#conversation-message", "You are required to change your password")
        b.set_val('#conversation-input', 'foobar')
        b.click('#login-button')

        # Type a bad password
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")

        b.wait_in_text("#conversation-prompt", "New password")
        b.set_val('#conversation-input', 'admin')
        b.click('#login-button')

        # We should see a message
        if m.ostree_image:
            b.wait_in_text("#conversation-prompt", "BAD PASSWORD")
        else:
            b.wait_in_text("#conversation-message", "BAD PASSWORD")

        # Now choose a better password
        b.wait_not_present("#login-button:disabled")
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "New password")
        b.set_val('#conversation-input', '123foobar!@#')
        b.click('#login-button')

        # Retype the password wrong
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "Retype")
        b.set_val('#conversation-input', '123foobar!') # wrong
        b.click('#login-button')

        # We should see a message
        if m.ostree_image:
            b.wait_in_text("#conversation-prompt", "passwords do not match")
        else:
            b.wait_in_text("#conversation-message", "passwords do not match")

        # Type the password again
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")

        b.wait_in_text("#conversation-prompt", "New password")
        b.set_val('#conversation-input', '123foobar!@#')
        b.click('#login-button')

        # Now type it right
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "Retype")
        b.set_val('#conversation-input', '123foobar!@#')
        b.click('#login-button')

        b.expect_load()
        b.wait_present("#content")
        b.wait_text('#content-user-name', 'Administrator')

        self.allow_journal_messages('Error executing command as another user: Not authorized',
                                    'This incident has been reported.',
                                    'sudo: a password is required',
                                    'sudo: sorry, you must have a tty to run sudo')
        self.allow_restart_journal_messages()
        self.allow_authorize_journal_messages()

    def testConversation(self):
        m = self.machine
        b = self.browser
        path = "/usr/lib/cockpit-test-assets/mock-pam-conv-mod.so"
        conf = "/etc/pam.d/cockpit"
        if m.ostree_image:
            conf = "/etc/pam.d/sshd"
            m.execute(
                "sed -i 's/.*ChallengeResponseAuthentication.*/ChallengeResponseAuthentication yes/' /etc/ssh/sshd_config", direct=True)
            m.execute(
                "( ! systemctl is-active sshd.socket || systemctl stop sshd.socket) && systemctl restart sshd.service", direct=True)

        m.execute("sed -i '5 a auth       required    {0}' {1}".format(path, conf))

        m.start_cockpit()
        b.open("/system")

        def login(user, password):
            b.wait_visible("#login")
            b.wait_not_visible("#conversation-group")
            b.wait_visible("#password-group")
            b.wait_visible("#user-group")
            b.set_val('#login-user-input', user)
            b.set_val('#login-password-input', password)
            b.set_checked('#authorized-input', True)
            b.click('#login-button')

        # Try to login as a non-existing user
        login("nonexisting", "blahblah")

        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "life the universe")
        b.set_val('#conversation-input', '43')
        b.click('#login-button')

        b.wait_text_not("#login-error-message", "")
        login("admin", "foobar")
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "life the universe")
        b.set_val('#conversation-input', '42')
        b.click('#login-button')

        b.expect_load()
        b.wait_present("#content")
        b.wait_text('#content-user-name', 'Administrator')

        self.allow_restart_journal_messages()

    def curl_auth(self, url, userpass):
        header = "Authorization: Basic " + base64.b64encode(userpass.encode()).decode()
        return subprocess.check_output(['/usr/bin/curl', '-s', '-k', '-D', '-', '--header', header,
                                        'http://{0}:{1}{2}'.format(self.machine.web_address, self.machine.web_port, url)],
                                       universal_newlines=True)

    def curl_auth_code(self, url, userpass):
        lines = self.curl_auth(url, userpass).splitlines()
        assert len(lines) > 0
        tokens = lines[0].split(' ', 2)
        assert len(tokens) == 3
        return int(tokens[1])

    def testRaw(self):
        self.machine.start_cockpit()
        time.sleep(0.5)
        self.assertEqual(self.curl_auth_code('/cockpit/login', ''), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:bar\n'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:bar:baz'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', ':\n\n'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'admin:bar'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:bar'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'admin:' + 'x' * 4000), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'x' * 4000 + ':bar'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'a' * 4000 + ':'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'a' * 4000 + ':b\nc'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'a' * 4000 + ':b\nc\n'), 401)

        self.allow_journal_messages("Returning error-response ... with reason .*",
                                    "pam_unix\(cockpit:auth\): authentication failure; .*",
                                    "pam_unix\(cockpit:auth\): check pass; user unknown",
                                    "pam_succeed_if\(cockpit:auth\): requirement .* not met by user .*",
                                    "couldn't parse login input: Malformed input",
                                    "couldn't parse login input: Authentication failed")

    @skipImage("No SELinux", "debian-stable", "debian-testing", "ubuntu-stable", "ubuntu-1804")
    @skipImage("No semanage", "fedora-coreos")
    def testSELinuxRestrictedUser(self):
        m = self.machine
        b = self.browser

        # non-admin user_u
        m.execute("useradd unpriv; echo 'unpriv:foobar' | chpasswd; semanage login -a -s user_u unpriv")
        m.start_cockpit()
        b.login_and_go("/system", user="unpriv")
        # not an admin
        b.wait_present("#restart-button[disabled][data-stable=yes]")
        b.wait_present("#shutdown-group[disabled][data-stable=yes]")

        b.logout()
        self.allow_authorize_journal_messages()
        # not allowed to restricted users
        self.allow_journal_messages(".*/var/lib/cockpit/machines.json.*Permission denied")
        self.allow_journal_messages('.*type=1400.*avc:  denied  { map }.*comm="cockpit-pcp".*')
        self.allow_journal_messages('.*type=1400.*avc:  denied .* comm="sudo".*')
        if m.image in ["fedora-30", "fedora-testing", "centos-8-stream"]:
            # older releases have more noise
            self.allow_journal_messages("sudo: .*setresuid.*: Operation not permitted")
            self.allow_journal_messages("sudo: no valid sudoers.*")
            self.allow_journal_messages("sudo: unable to initialize policy plugin")
            self.allow_journal_messages("sudo: unable to initialize policy plugin")
            self.allow_journal_messages('.*type=1400.*avc:  denied .* comm="pkexec".* scontext=user_u.*')

        # sysadm_u
        m.execute("semanage login -a -s sysadm_u admin")
        b.login_and_go("/system")
        # shutdown button should be enabled and working
        b.click("#restart-button[data-stable=yes]")
        b.wait_popup("shutdown-dialog")
        b.wait_in_text("#shutdown-dialog .btn-danger", 'Restart')
        b.click('#shutdown-dialog button[data-dismiss="modal"]')
        b.wait_popdown("shutdown-dialog")

    def testUnsupportedBrowser(self):
        m = self.machine
        b = self.browser

        m.start_cockpit()
        # pretend browser doesn't support a required capability
        b.cdp.invoke("Page.addScriptToEvaluateOnNewDocument", source="window.WebSocket = undefined;")
        b.open("/system")
        b.wait_visible("#unsupported-browser")
        b.wait_not_visible("#login-fatal")
        b.wait_not_visible("#login")
        b.wait_not_visible("#login-details")

    @skipImage("Starting on OSTree is weird", "fedora-coreos")
    def testFailingWebsocket(self, safari=False, cacert=False):
        m = self.machine
        b = self.browser

        # Cause cockpit-ws to reject WebSocket connections
        m.write("/etc/cockpit/cockpit.conf", "[WebService]\nOrigins=foo.bar.com\n")
        self.allow_journal_messages("received request from bad Origin: .*",
                                    "Received invalid handshake request from the client")
        self.allow_browser_errors(".*")

        m.start_cockpit()
        # Really start Cockpit to make sure it has generated all its certificates.
        m.execute("systemctl start cockpit")

        if safari:
            b.set_user_agent("Safari/300")

        if cacert:
            m.write("/etc/cockpit/ws-certs.d/0-self-signed-ca.pem", "FAKE CERT FOR TESTING\n")
        else:
            m.execute("rm -f /etc/cockpit/ws-certs.d/0-self-signed-ca.pem")

        # Log in.
        b.open("/system")
        b.wait_visible("#login")
        b.set_val('#login-user-input', "admin")
        b.set_val('#login-password-input', "foobar")
        b.click('#login-button')
        b.expect_load()
        b.wait_present("#content")

        b.wait_visible("#early-failure")
        if safari and cacert:
            b.wait_visible("#safari-cert-help")
        else:
            b.wait_not_visible("#safari-cert-help")

    @skipBrowser("Enough when only chromium pretends to be a different browser", "firefox")
    @skipImage("Starting on OSTree is weird", "fedora-coreos")
    def testFailingWebsocketSafari(self):
        self.testFailingWebsocket(safari=True, cacert=True)

    @skipBrowser("Enough when only chromium pretends to be a different browser", "firefox")
    @skipImage("Starting on OSTree is weird", "fedora-coreos")
    def testFailingWebsocketSafariNoCA(self):
        self.testFailingWebsocket(safari=True, cacert=False)

if __name__ == '__main__':
    test_main()
